package com.bjy.qa.agent.tools.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class MqttHelper {
    private static final Logger logger = LoggerFactory.getLogger(MqttHelper.class);

    private MqttQOS qos = MqttQOS.LEVEL_1; // QOS服务质量（默认LEVEL1）
    private MqttClient client; // mqtt 客户端实例
    private MqttListener mqttListener; // mqtt 请求回调类
    private Set topicSet = new HashSet(); // 当前订阅的主题列表

    private int connectionTimeout = 30; // 设置连接超时时间。该值以秒为单位
    private int keepAliveInterval = 60; // 心跳时间，设置“保持活动”间隔超过60秒断开。秒为单位，超过此时间没有收到消息回调 connectionLost 方法
    private boolean automaticReconnect = true; // 设置连接丢失后是否会自动重连，首先等待1秒，之后延迟将加倍，直到达到2分钟。

    public MqttQOS getQos() {
        return qos;
    }

    public void setQos(MqttQOS qos) {
        this.qos = qos;
    }

    public MqttListener getMqttListener() {
        return mqttListener;
    }

    public void setMqttListener(MqttListener mqttListener) {
        this.mqttListener = mqttListener;
    }

    public int getConnectionTimeout() {
        return connectionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public int getKeepAliveInterval() {
        return keepAliveInterval;
    }

    public void setKeepAliveInterval(int keepAliveInterval) {
        this.keepAliveInterval = keepAliveInterval;
    }

    public boolean isAutomaticReconnect() {
        return automaticReconnect;
    }

    public void setAutomaticReconnect(boolean automaticReconnect) {
        this.automaticReconnect = automaticReconnect;
    }

    /**
     * mqtt请求回调类
     */
    public interface MqttListener {
        /**
         * 服务器连接成功时回调此方法，默认重连后会自动订阅之前的topic
         * @param reconnect true:是自动重连的结果
         * @param serverURI 连接到服务器地址
         */
        public void connectComplete(boolean reconnect, String serverURI);


        /**
         * 连接断开是回调此方法，主要用于重连
         * @param cause 导致连接丢失的原因
         */
        public void connectionLost(Throwable cause);

        /**
         * 收到订阅的消息会回调此方法
         * @param topic 主题
         * @param message 收到的消息
         * @throws Exception
         */
        public void messageArrived(String topic, MqttMessage message) throws Exception;

        /**
         * 消息传递完成会回调此方法
         * Qos 0：当消息发出后回调
         * QoS 1：当收到 PUBACK 后回调
         * QoS 2：当收到 PUBCOMP 后回调
         * @param token 该令牌与消息发布时返回的令牌相同，可以使用 token.getMessageId() 获得消息 id 或 token.getMessage() 获得消息内容
         */
        public void deliveryComplete(IMqttDeliveryToken token);
    }

    /**
     * 获取 mqtt 实例
     * @param url mqtt的地址，例如："tcp://39.105.36.154:1883"
     * @param clientId 客户端的id标识，重复之前的客户端会被挤下来
     * @return
     * @throws MqttException
     */
    public MqttHelper getInstance(String url, String clientId) throws MqttException {
        return getInstance(url, clientId, null, null);
    }

    /**
     * 获取 mqtt 实例
     * @param url mqtt的地址，例如："tcp://39.105.36.154:1883"
     * @param clientId 客户端的id标识，重复之前的客户端会被挤下来
     * @param userName 如果需要验证输入用户名
     * @param passWord 如果需要验证输入密码
     * @return
     * @throws MqttException
     */
    public MqttHelper getInstance(String url, String clientId, String userName, String passWord) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence(); // MemoryPersistence设置clientid的保存形式，默认为以内存保存
        client = new MqttClient(url, clientId, persistence);

        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        if (userName != null && passWord != null) { // 需要用户验证
            connOpts.setUserName(userName);
            connOpts.setPassword(passWord.toCharArray());
        }
        connOpts.setCleanSession(true); // 保留会话
        connOpts.setConnectionTimeout(connectionTimeout);
        connOpts.setKeepAliveInterval(keepAliveInterval);
        connOpts.setAutomaticReconnect(automaticReconnect);

        client.connect(connOpts); // 建立连接
        logger.debug("连接 MQTT 服务器成功：" + url);
        return this;
    }

    /**
     * 发布消息
     * @param topic 主题
     * @param message 发送的消息
     * @return
     */
    public boolean publish(String topic, String message) throws MqttException, InterruptedException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(this.qos.getValue());
        client.publish(topic, mqttMessage);
        logger.debug("发布消息成功，topic:" + topic + ", message:" + message);
        return true;
    }

    /**
     * 订阅消息
     * @param mqttListener 订阅后的回调
     * @param topics 订阅的主题，支持同时订阅多个主题，例：订阅两个主题就是 "topic1", "topic2" 逗号分割就好
     * @throws MqttException
     */
    public void subscribe(MqttListener mqttListener, String... topics) throws MqttException {
        int qoss[] = new int[topics.length];
        for (int i = 0; i < qoss.length; i++) {
            qoss[i] = this.qos.getValue();
        }

        this.mqttListener = mqttListener;
        client.subscribe(topics, qoss);
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                try {
                    if (reconnect) {
                        logger.debug("MQTT 服务器断开重连成功：" + serverURI);
                    } else {
                        logger.debug("连接 MQTT 服务器成功：" + serverURI);
                    }
                    subscribe(mqttListener, (String[])topicSet.toArray(new String[0]));
                    mqttListener.connectComplete(reconnect, serverURI);
                } catch (MqttException e) {
                    logger.error("订阅失败，topic: {}. {}", topicSet, e);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                logger.debug("mqtt 连接断开, 原因:" + cause);
                mqttListener.connectionLost(cause);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                logger.debug("收到订阅的消息, topic:" + topic + ", message:" + message);
                mqttListener.messageArrived(topic, message);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                logger.debug("消息传递完成, token:" + token);
                mqttListener.deliveryComplete(token);
            }
        });
        for (String topic : topics) { // 添加到 当前订阅的主题列表
            this.topicSet.add(topic);
        }
        logger.debug("订阅成功，topic:" + Arrays.toString(topics));
    }

    /**
     * 取消订阅
     * @param topics 取消订阅的主题，支持同时取消订阅多个主题，例：取消订阅两个主题就是 "topic1", "topic2" 逗号分割就好
     * @throws MqttException
     */
    public void unsubscribe(String... topics) throws MqttException {
        client.unsubscribe(topics);

        for (String topic : topics) { // 从当前订阅的主题列表 删除topic
            this.topicSet.remove(topic);
        }
        logger.debug("取消订阅成功，topic:" + Arrays.toString(topics));
    }

    /**
     * 关闭 mqtt 连接
     * @return
     * @throws MqttException
     */
    public boolean close() throws MqttException {
        String url = client.getServerURI();
        client.disconnect();
        client.close();
        logger.debug("断开 MQTT 服务器成功: "+ url);
        this.topicSet.clear(); // 清除当前订阅列表
        return true;
    }

    /**
     * 返回 mqtt 是否连接到服务器
     * @return
     */
    public boolean isConnected() {
        return this.client.isConnected();
    }

}
